如何实现阻塞队列 您所在的位置:网站首页 reentrantlock condition 如何实现阻塞队列

如何实现阻塞队列

#如何实现阻塞队列| 来源: 网络整理| 查看: 265

前言

Java的线程池中,在核心线程数已满的情况下,任务会存储在阻塞队列中,那么什么是阻塞队列呢?

阻塞队列首先是个队列,在队列的基础上,支持另外两个附加操作:

在队列为空时,获取元素的线程会等待队列变为非空 在队列满时,添加元素的线程会等待队列可用

image.png

那么阻塞队列是如何实现阻塞的?

自己实现一个阻塞队列 Synchronized、wait、notifyAll实现的阻塞队列 public class BlockingQueue { // 放置元素索引 private int inputIndex; // 取出元素索引 private int takeIndex; // 元素数组 private String[] elements; // 数组中元素数量 private int count; public BlockingQueue(int capacity) { elements = new String[capacity]; } public Object take() throws InterruptedException { synchronized(this) { // 这里使用while的原因是线程被唤醒之后需要再判断一次数组是否已经有元素 while (count == 0) { // 数组没有元素了,等待 this.wait(); } Object e = dequeue(); this.notify(); System.out.println("take method: " + Arrays.toString(elements)); return e; } } public void put(String str) throws InterruptedException { synchronized (this) { // 这里使用while的原因是线程被唤醒之后需要再判断一次数组元素是否有空闲位置 while (count == elements.length) { // 数组元素满了,等待 this.wait(); } enqueue(str); System.out.println("put method: " + Arrays.toString(elements)); this.notify(); } } /** * 入队方法 * @param e 元素 */ private void enqueue(String e) { elements[inputIndex] = e; // 如果数组已满,input返回开头 if (++inputIndex == elements.length) { inputIndex = 0; } count ++; } /** * 出队方法 * @return */ private Object dequeue() { Object e = elements[takeIndex]; elements[takeIndex] = null; // 如果takeIndex已到数组终点,返回开头 if (++takeIndex == elements.length) { takeIndex = 0; } count --; return e; } } 复制代码 public static void main(String[] args) { BlockingQueue queue = new BlockingQueue(10); // 10个线程不断放置元素 IntStream.range(0, 10).forEach(i -> { Thread a = new Thread(() -> { try { queue.put("element"); } catch (InterruptedException e) { e.printStackTrace(); } }); a.start(); }); // 10个线程取出元素 IntStream.range(0, 10).forEach(i -> { Thread b = new Thread(() -> { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }); b.start(); }); } 复制代码

image.png

condition、await、singal实现的阻塞队列 public class BlockingQueueWithCondition { // 放置元素索引 private int inputIndex; // 取出元素索引 private int takeIndex; // 元素数组 private String[] elements; // 数组中元素数量 private int count; ReentrantLock lock = new ReentrantLock(); Condition notEmpty = lock.newCondition(); Condition notFull = lock.newCondition(); public BlockingQueueWithCondition(int capacity) { elements = new String[capacity]; } public String take() throws InterruptedException { lock.lock(); try { // 数组没有元素了,等待 while (count == 0) { notEmpty.await(); } String str = elements[takeIndex]; elements[takeIndex] = null; // 如果takeIndex已到数组终点,返回开头 if (++takeIndex == elements.length) { takeIndex = 0; } notFull.signal(); System.out.println("take method: " + Arrays.toString(elements)); count--; return str; } finally { lock.unlock(); } } public void put(String str) throws InterruptedException { lock.lock(); try { // 数组元素满了,等待 while (count == elements.length) { notFull.await(); } elements[inputIndex] = str; // 如果inputIndex已到数组终点,返回开头 if (++inputIndex == elements.length) { inputIndex = 0; } notEmpty.signal(); System.out.println("put method: " + Arrays.toString(elements)); count++; } finally { lock.unlock(); } } } 复制代码 public static void main(String[] args) { BlockingQueueWithCondition queue = new BlockingQueueWithCondition(10); // 10个线程不断放置元素 IntStream.range(0, 10).forEach(i -> { Thread a = new Thread(() -> { try { queue.put("element"); } catch (InterruptedException e) { e.printStackTrace(); } }); a.start(); }); // 10个线程取出元素 IntStream.range(0, 10).forEach(i -> { Thread b = new Thread(() -> { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }); b.start(); }); } 复制代码

image.png

ReentrantLock、 Condition(await与signal)与synchronized、wait、notify非常相似,那么两者有什么差别呢?

调用wait时,首先需要确保调用了wait方法的线程已经持有了对象的锁,调用wait后,该线程会释放掉这个对象的锁,进入等待队列(wait set) 当调用notify时,系统会随机唤醒该对象等待队列中任意一个线程,当这个线程被唤醒后,它就会与其它线程一同竞争对象的锁 synchronized获取锁和释放锁都是通过JVM底层来操作,无需开发者关注 ReentrantLock获取锁和释放锁可以由开发者操作,更加灵活,调用await方法的线程会进入对象的等待队列中,调用singal方法时可以指定唤醒某个对象等待队列中的阻塞任务 JDK中的阻塞队列

JDK中提供了非常多的阻塞队列,这里只解析LinkedBlockingQueue,如果理解了上面阻塞队列的写法,可以很快理解JDK阻塞队列的源码

一些重要的参数 // 阻塞队列中的元素会包装成一个节点,有链表必有节点 static class Node { E item; Node next; Node(E x) { item = x; } } // 阻塞队列容量, private final int capacity; // 阻塞队列当前元素个数 private final AtomicInteger count = new AtomicInteger(); // 阻塞队列头节点 transient Node head; // 阻塞队列尾节点 private transient Node last; // LinkedBlockingQueue使用了两把锁,存取互不排斥 // take锁 private final ReentrantLock takeLock = new ReentrantLock(); // 当队列中无元素时,take锁会阻塞,直到被其它线程唤醒 private final Condition notEmpty = takeLock.newCondition(); // put锁 private final ReentrantLock putLock = new ReentrantLock(); // 当队列中元素已满时,put锁会阻塞,直到被其它线程唤醒 private final Condition notFull = putLock.newCondition(); 复制代码 put方法 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // put锁进行加锁 putLock.lockInterruptibly(); try { // 如果队列元素已满,阻塞在notFull条件上 while (count.get() == capacity) { notFull.await(); } // 入队 enqueue(node); // 注意:这里是先获取出队前队列长度,再加一 c = count.getAndIncrement(); // 如果当前队列元素加一还未达队列元素上线,则再唤醒一个线程,因为可能有很多线程阻塞在notFull // 条件上 if (c + 1 < capacity) notFull.signal(); } finally { // put锁解锁 putLock.unlock(); } // 加了一个元素后,唤醒阻塞在notEmpty条件上的线程来取元素 if (c == 0) signalNotEmpty(); } 复制代码 take方法 public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // takeLock进行加锁 takeLock.lockInterruptibly(); try { // 如果链表元素为空,阻塞在notEmpty上 while (count.get() == 0) { notEmpty.await(); } // 元素出队 x = dequeue(); // 注意:这里是先获取出队前队列长度,再减一 c = count.getAndDecrement(); // 如果链表中元素> 1,唤醒指定对象的等待队列中的阻塞任务 if (c > 1) notEmpty.signal(); } finally { // 释放锁 takeLock.unlock(); } // 如果出队前队列长度已满,现在减了一个元素后,唤醒阻塞在notFull条件上的线程 if (c == capacity) signalNotFull(); return x; } 复制代码


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有